#include "config.h"

#include <stdbool.h>
#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"

#include "schedule.h"
#include "core.h"
#include "coroutine.h"
#include "volume_proto.h"

#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

extern uint32_t lsv_feature;

int lsv_wbuffer_mem_init(lsv_volume_proto_t *lsv_info) {
        int ret = 0;
        lsv_wbuf_t *wbuf = NULL;

        ret = ymalloc((void **)&lsv_info->wbuf, sizeof(lsv_wbuf_t));
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }
        memset(lsv_info->wbuf, 0, sizeof(lsv_wbuf_t));

        wbuf = (lsv_wbuf_t *) lsv_info->wbuf;

        co_mq_init(&wbuf->io_mq, "io_mq");

        co_cond_init(&wbuf->io_cond);
        co_cond_init(&wbuf->flush_cond);
        co_cond_init(&wbuf->full_cond);

        pipeline_init(&wbuf->pipeline, lsv_info);

        co_mq_init_batch(&wbuf->wbuf2log_stream, "wbuf2log_task", 2);

        lsv_wbuf_qos_init(&wbuf->qos, 1000000);

        wbuf->last_lsn = 0;
        wbuf->last_psn = 0;

        wbuf->commit_lsn = 0;
        wbuf->commit_psn = 0;

        wbuf->__last_lsn = 0;
        wbuf->__last_psn = 0;

        return 0;
err_ret:
        return ret;
}

static int __do_wbuf2log(void *arg) {
        int ret;
        lsv_wbuf2log_ctx_t *ctx = arg;

        DINFO("begin\n");

        ret = lsv_log_write(ctx->lsv_info, ctx->chunk_buf);
        if (unlikely(ret)) {
                DWARN("log_write err.ret %d\n", ret);
        }

        chunk_post_commit(ctx->lsv_info, ctx->chunk_buf, ret);

        DINFO("end ret %d\n", ret);
        return 0;
}

int lsv_wbuffer2log(lsv_volume_proto_t *lsv_info, lsv_chunk_buf_t *chunk_buf)
{
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        DINFO_NOP("begin\n");

        YASSERT(chunk_buf->page_idx > 0);
        YASSERT(chunk_buf->in_use == LSV_WBUF_CHUNK_PRE_COMMIT);

        if (lsv_feature & LSV_FEATURE_WBUF_PIPELINE) {
                ret = pipeline_add_tail(&wbuf->pipeline, chunk_buf);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                lsv_wbuf2log_ctx_t *ctx;
                ret = ymalloc((void **)&ctx, sizeof(lsv_wbuf2log_ctx_t));
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ctx->lsv_info = lsv_info;
                ctx->chunk_buf = chunk_buf;

                ret = co_mq_offer(&wbuf->wbuf2log_stream, "wbuf2log", __do_wbuf2log, ctx, 1);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

        }

        DINFO_NOP("end ret %d\n", ret);
        return 0;
err_ret:
        return ret;
}

int chunk_post_commit(lsv_volume_proto_t *lsv_info, lsv_chunk_buf_t *chunk, int in_use) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        // TODO
        YASSERT(in_use == 0);

        DINFO("ino %llu lsn %llu chunk %p %d page_idx %d in_use %d\n",
              (LLU)lsv_info->ino,
              (LLU)chunk->chunk_lsn,
              chunk,
              chunk->id,
              chunk->page_idx,
              in_use)

        // 有yield，提到函数入口处执行
        if (lsv_feature & LSV_FEATURE_WAL2) {
                lsv_wal2_segment_commit(lsv_info, chunk->chunk_lsn);
        }

        // 以下操作要原子执行，不能中断
        // 否则，并发情况下，重用chunk后导致assert失败
        lsv_wbuffer_chunk_commit(lsv_info, chunk, LSV_WBUF_CHUNK_POST_COMMIT);

        // TODO commit lsn order
        YASSERT(wbuf->commit_lsn <= chunk->chunk_lsn);
        wbuf->commit_lsn = chunk->chunk_lsn;

        // TODO commit page order
        YASSERT(wbuf->commit_psn < chunk->chunk_psn);
        wbuf->commit_psn = chunk->chunk_psn;

        //
        co_cond_broadcast(&wbuf->full_cond, 0);
        co_cond_broadcast(&wbuf->flush_cond, 0);

        return 0;
}

int lsv_wbuffer_do_flush(lsv_volume_proto_t *lsv_info) {
        lsv_wbuffer_chunk_push_log(lsv_info, NULL, 1, 0);

        // TODO 如果commit_lsn在一个IO的中间，是否影响正确性？
        // TODO wait, use psn
        lsv_wbuf_t *wbuf = (lsv_wbuf_t *)lsv_info->wbuf;

        // 快照点, 不能跨IO
        uint64_t psn = wbuf->last_psn;
        uint64_t lsn = wbuf->last_lsn;

        DINFO("ino %llu flush psn %llu commit_psn %llu lsn %llu commit_lsn %llu\n",
              (LLU)lsv_info->ino,
              (LLU)psn, (LLU)wbuf->commit_psn,
              (LLU)lsn, (LLU)wbuf->commit_lsn);
        while (wbuf->commit_psn < psn) {
                co_cond_wait(&wbuf->flush_cond);
                DINFO("ino %llu flush psn %llu commit_psn %llu lsn %llu commit_lsn %llu\n",
                      (LLU)lsv_info->ino,
                      (LLU)psn, (LLU)wbuf->commit_psn,
                      (LLU)lsn, (LLU)wbuf->commit_lsn);
        }

        return 0;
}
